package com.dec.kks.etl.producer;

import com.google.gson.Gson;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.Map;

public class ProducerMain {

    /**
     * <p/>
     * <li>Description:KKSDATAWriteToKafka</li>
     * <li>@author: liukailong <kailong.liu@cdcalabar.com> </li>
     * <li>Date: 18-4-20 上午10:16</li>
     * <li>@version: 2.0.0 </li>
     * <li>@since JDK 1.8 </li>
     */

    private static final Logger LOGGER = LoggerFactory.getLogger(ProducerMain.class);

    // language=JSON
    static String json = "{\n" +
            " \"power_type\": \"T\",\n" +
            " \"power_code\": \"001\",\n" +
            " \"power_unit_code\": \"02\",\n" +
            " \"collect_time\":151,\n" +
            " \"packeted_data\": {\n" +
            "  \"000000008347\": {\n" +
            "   \"src_code\": \"10BAY10AG001CG01.PV\",\n" +
            "   \"substd_code\": \"xxxx\",\n" +
            "   \"src_desc\": \"xxxx\",\n" +
            "   \"src_value\": 21.931957,\n" +
            "   \"src_time\": 1517450744000,\n" +
            "   \"src_data_quality\": 192,\n" +
            "   \"std_code\": \"xxxx\",\n" +
            "   \"std_time\": 15174507446666,\n" +
            "   \"std_value\": 20.76,\n" +
            "   \"std_data_quality\": 0,\n" +
            "   \"set_code\": \"xxxxx\"\n" +
            "  },\n" +
            "  \"000000008348\": {\n" +
            "   \"src_code\": \"110BAY10AG001CG02.PV\",\n" +
            "   \"substd_code\": \"xxxx\",\n" +
            "   \"src_desc\": \"xxxx\",\n" +
            "   \"src_value\": 21.931957,\n" +
            "   \"src_time\": 1517450744000,\n" +
            "   \"src_data_quality\": 192,\n" +
            "   \"std_code\": \"xxxx\",\n" +
            "   \"std_time\": 15174507446666,\n" +
            "   \"std_value\": 20.76,\n" +
            "   \"std_data_quality\": 0,\n" +
            "   \"set_code\": \"xxxxx\"\n" +
            "  },\n" +
            "  \"000000008342\": {\n" +
            "   \"src_code\": \"10MAX10AX001CG04.PV\",\n" +
            "   \"substd_code\": \"xxxx\",\n" +
            "   \"src_desc\": \"xxxx\",\n" +
            "   \"src_value\": 21.931957,\n" +
            "   \"src_time\": 1517450744000,\n" +
            "   \"src_data_quality\": 192,\n" +
            "   \"std_code\": \"xxxx\",\n" +
            "   \"std_time\": 15174507446666,\n" +
            "   \"std_value\": 20.76,\n" +
            "   \"std_data_quality\": 0,\n" +
            "   \"set_code\": \"xxxxx\"\n" +
            "  }\n" +
            " }\n" +
            "}";

    public static void main(String[] args) throws Exception {
        String topic = "spark-topic";
        String path = "/home/hdfs/soft/IdeaProjects/dec-kks-etl/src/main/resources";
        int sleepTime = 1000;

        Gson gson = new Gson();
        KKSEntity input = gson.fromJson(json, KKSEntity.class);
        Map config = DECConfigUtil.loadConfig(path,1);
        KafkaProducer producer = DECKafkaUtil.initProducer(config);
        Integer msgId = Math.toIntExact(System.currentTimeMillis() / 1000);
        while (true) {
            msgId = msgId + 1;
            input.setCollect_time(msgId);
            Map<String, PacketData> packetData = input.getPacketed_data();
            Iterator<Map.Entry<String, PacketData>> it = packetData.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<String, PacketData> tmp = it.next();
                PacketData data = tmp.getValue();
                data.setSrc_value(RandomUtils.nextDouble(0, 10000));
            }
            DECKafkaUtil.sendMsg(producer, topic,null,gson.toJson(input) );

            if (msgId == msgId + 300 * 10000) {
                break;
            }
            System.out.println(msgId);
            Thread.sleep(sleepTime);
        }
    }



}
